Character Level RNN Exercise
Table of Contents
Character-Level LSTM in PyTorch
In this notebook, I'll construct a character-level LSTM with PyTorch. The network will train character by character on some text, then generate new text character by character. As an example, I will train on Anna Karenina. This model will be able to generate new text based on the text from the book!
This network is based off of Andrej Karpathy's post on RNNs and implementation in Torch. Below is the general architecture of the character-wise RNN.
Set Up
First let's load in our required resources for data loading and model creation.
import numpy as np import torch from torch import nn import torch.nn.functional as F
with open('data/anna.txt', 'r') as f: text = f.read()
text[:100]
chars = tuple(set(text)) int2char = dict(enumerate(chars)) char2int = {ch: ii for ii, ch in int2char.items()}
encoded = np.array([char2int[ch] for ch in text])
encoded[:100]
def one_hot_encode(arr, n_labels):
one_hot = np.zeros((np.multiply(*arr.shape), n_labels), dtype=np.float32)
one_hot[np.arange(one_hot.shape[0]), arr.flatten()] = 1.
one_hot = one_hot.reshape((*arr.shape, n_labels))
return one_hot
test_seq = np.array([[3, 5, 1]])
one_hot = one_hot_encode(test_seq, 8)
print(one_hot)
def get_batches(arr, batch_size, seq_length): '''Create a generator that returns batches of size batch_size x seq_length from arr.
Arguments
arr: Array you want to make batches from batch_size: Batch size, the number of sequences per batch seq_length: Number of encoded chars in a sequence '''
## TODO: Get the number of batches we can make n_batches =
## TODO: Keep only enough characters to make full batches arr =
## TODO: Reshape into batch_size rows arr =
## TODO: Iterate over the batches using a window of size seq_length for n in range(0, arr.shape[1], seq_length):
x =
y = yield x, y
batches = get_batches(encoded, 8, 50) x, y = next(batches)
print('x\n', x[:10, :10]) print('\ny\n', y[:10, :10])
train_on_gpu = torch.cuda.is_available() if(train_on_gpu): print('Training on GPU!') else: print('No GPU available, training on CPU; consider making n_epochs very small.')
class CharRNN(nn.Module):
def __init__(self, tokens, n_hidden=256, n_layers=2, drop_prob=0.5, lr=0.001): super().__init__() self.drop_prob = drop_prob self.n_layers = n_layers self.n_hidden = n_hidden self.lr = lr
self.chars = tokens self.int2char = dict(enumerate(self.chars)) self.char2int = {ch: ii for ii, ch in self.int2char.items()}
## TODO: define the layers of the model
def forward(self, x, hidden): ''' Forward pass through the network. These inputs are x, and the hidden/cell state `hidden`. '''
## TODO: Get the outputs and the new hidden state from the lstm
return out, hidden
def init_hidden(self, batch_size): ''' Initializes hidden state '''
weight = next(self.parameters()).data
if (train_on_gpu): hidden = (weight.new(self.n_layers, batch_size, self.n_hidden).zero_().cuda(), weight.new(self.n_layers, batch_size, self.n_hidden).zero_().cuda()) else: hidden = (weight.new(self.n_layers, batch_size, self.n_hidden).zero_(), weight.new(self.n_layers, batch_size, self.n_hidden).zero_())
return hidden
def train(net, data, epochs=10, batch_size=10, seq_length=50, lr=0.001, clip=5, val_frac=0.1, print_every=10): ''' Training a network
Arguments
net: CharRNN network data: text data to train the network epochs: Number of epochs to train batch_size: Number of mini-sequences per mini-batch, aka batch size seq_length: Number of character steps per mini-batch lr: learning rate clip: gradient clipping val_frac: Fraction of data to hold out for validation print_every: Number of steps for printing training and validation loss
''' net.train()
opt = torch.optim.Adam(net.parameters(), lr=lr) criterion = nn.CrossEntropyLoss()
val_idx = int(len(data)*(1-val_frac)) data, val_data = data[:val_idx], data[val_idx:]
if(train_on_gpu): net.cuda()
counter = 0 n_chars = len(net.chars) for e in range(epochs):
h = net.init_hidden(batch_size)
for x, y in get_batches(data, batch_size, seq_length): counter += 1
x = one_hot_encode(x, n_chars) inputs, targets = torch.from_numpy(x), torch.from_numpy(y)
if(train_on_gpu): inputs, targets = inputs.cuda(), targets.cuda()
h = tuple([each.data for each in h])
net.zero_grad()
output, h = net(inputs, h)
loss = criterion(output, targets.view(batch_size*seq_length)) loss.backward()
nn.utils.clip_grad_norm_(net.parameters(), clip) opt.step()
if counter % print_every == 0:
val_h = net.init_hidden(batch_size) val_losses = [] net.eval() for x, y in get_batches(val_data, batch_size, seq_length):
x = one_hot_encode(x, n_chars) x, y = torch.from_numpy(x), torch.from_numpy(y)
val_h = tuple([each.data for each in val_h])
inputs, targets = x, y if(train_on_gpu): inputs, targets = inputs.cuda(), targets.cuda()
output, val_h = net(inputs, val_h) val_loss = criterion(output, targets.view(batch_size*seq_length))
val_losses.append(val_loss.item())
net.train() # reset to train mode after iterationg through validation data
print("Epoch: {}/{}…".format(e+1, epochs), "Step: {}…".format(counter), "Loss: {:.4f}…".format(loss.item()), "Val Loss: {:.4f}".format(np.mean(val_losses)))
## TODO: set you model hyperparameters
n_hidden= n_layers=
net = CharRNN(chars, n_hidden, n_layers) print(net)
batch_size = seq_length = n_epochs = # start small if you are just testing initial behavior
train(net, encoded, epochs=n_epochs, batch_size=batch_size, seq_length=seq_length, lr=0.001, print_every=10)
model_name = 'rnn_x_epoch.net'
checkpoint = {'n_hidden': net.n_hidden, 'n_layers': net.n_layers, 'state_dict': net.state_dict(), 'tokens': net.chars}
with open(model_name, 'wb') as f: torch.save(checkpoint, f)
def predict(net, char, h=None, top_k=None): ''' Given a character, predict the next character. Returns the predicted character and the hidden state. '''
x = np.array([[net.char2int[char]]]) x = one_hot_encode(x, len(net.chars)) inputs = torch.from_numpy(x)
if(train_on_gpu): inputs = inputs.cuda()
h = tuple([each.data for each in h])
out, h = net(inputs, h)
p = F.softmax(out, dim=1).data if(train_on_gpu): p = p.cpu() # move to cpu
if top_k is None: top_ch = np.arange(len(net.chars)) else: p, top_ch = p.topk(top_k) top_ch = top_ch.numpy().squeeze()
p = p.numpy().squeeze() char = np.random.choice(top_ch, p=p/p.sum())
return net.int2char[char], h
def sample(net, size, prime='The', top_k=None):
if(train_on_gpu): net.cuda() else: net.cpu()
net.eval() # eval mode
chars = [ch for ch in prime] h = net.init_hidden(1) for ch in prime: char, h = predict(net, ch, h, top_k=top_k)
chars.append(char)
for ii in range(size): char, h = predict(net, chars[-1], h, top_k=top_k) chars.append(char)
return ''.join(chars)
print(sample(net, 1000, prime='Anna', top_k=5))
with open('rnn_x_epoch.net', 'rb') as f: checkpoint = torch.load(f)
loaded = CharRNN(checkpoint['tokens'], n_hidden=checkpoint['n_hidden'], n_layers=checkpoint['n_layers']) loaded.load_state_dict(checkpoint['state_dict'])
print(sample(loaded, 2000, top_k=5, prime="And Levin said"))




























